{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7427cbb3",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | default_exp _components.test_dependencies"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5ea7e8b8",
   "metadata": {},
   "source": [
    "# Install Test Dependencies"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f96418b5",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "import re\n",
    "import platform\n",
    "import shutil\n",
    "import tarfile\n",
    "from contextlib import contextmanager\n",
    "from html.parser import HTMLParser\n",
    "from os import environ, rename\n",
    "from os.path import expanduser\n",
    "from pathlib import Path\n",
    "from tempfile import TemporaryDirectory\n",
    "from typing import *\n",
    "\n",
    "from packaging import version\n",
    "\n",
    "from fastkafka._components.helpers import change_dir, in_notebook\n",
    "from fastkafka._components.logger import get_logger\n",
    "\n",
    "if in_notebook():\n",
    "    from tqdm.notebook import tqdm\n",
    "else:\n",
    "    from tqdm import tqdm"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "19b144a5",
   "metadata": {},
   "outputs": [],
   "source": [
    "from fastkafka._components.logger import suppress_timestamps"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "68b5eebe",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "logger = get_logger(__name__)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cd7f23f5",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] __main__: ok\n"
     ]
    }
   ],
   "source": [
    "suppress_timestamps()\n",
    "logger = get_logger(__name__, level=20)\n",
    "logger.info(\"ok\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d3337f8e",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "def check_java(*, potential_jdk_path: Optional[List[Path]] = None) -> bool:\n",
    "    \"\"\"Checks if JDK 11 is installed on the machine and exports it to PATH if necessary.\n",
    "\n",
    "    Args:\n",
    "        potential_jdk_path: Optional. List of potential paths where JDK 11 may be installed.\n",
    "                            If not provided, it defaults to searching for JDK 11 in the user's home directory.\n",
    "\n",
    "    Returns:\n",
    "        bool: True if JDK 11 is installed and exported to PATH, False otherwise.\n",
    "    \"\"\"\n",
    "    if potential_jdk_path is None:\n",
    "        potential_jdk_path = list(Path(expanduser(\"~\") + \"/.jdk\").glob(\"jdk-11*\"))\n",
    "\n",
    "    if potential_jdk_path != []:\n",
    "        logger.info(\"Java is already installed.\")\n",
    "        if not shutil.which(\"java\"):\n",
    "            logger.info(\"But not exported to PATH, exporting...\")\n",
    "            env_path_separator = \";\" if platform.system() == \"Windows\" else \":\"\n",
    "            environ[\"PATH\"] = (\n",
    "                environ[\"PATH\"] + f\"{env_path_separator}{potential_jdk_path[0]/ 'bin'}\"\n",
    "            )\n",
    "        return True\n",
    "    return False"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1d8daf9f",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] __main__: Java is already installed.\n",
      "[INFO] __main__: But not exported to PATH, exporting...\n"
     ]
    }
   ],
   "source": [
    "assert check_java()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "374f061d",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "def _install_java() -> None:\n",
    "    \"\"\"Checks if jdk-11 is installed on the machine and installs it if not\n",
    "    \n",
    "    Returns:\n",
    "       None\n",
    "\n",
    "    Raises:\n",
    "        RuntimeError: If JDK 11 installation fails.\n",
    "    \"\"\"\n",
    "    try:\n",
    "        import jdk\n",
    "    except Exception as e:\n",
    "        msg = \"Please install test version of fastkafka using 'pip install fastkafka[test]' command\"\n",
    "        logger.error(msg)\n",
    "        raise RuntimeError(msg)\n",
    "\n",
    "    if not check_java():\n",
    "        logger.info(\"Installing Java...\")\n",
    "        logger.info(\" - installing jdk...\")\n",
    "        jdk_bin_path = Path(jdk.install(\"11\"))\n",
    "        logger.info(f\" - jdk path: {jdk_bin_path}\")\n",
    "        env_path_separator = \";\" if platform.system() == \"Windows\" else \":\"\n",
    "        environ[\"PATH\"] = (\n",
    "            environ[\"PATH\"] + f\"{env_path_separator}{jdk_bin_path / 'bin'}\"\n",
    "        )\n",
    "        logger.info(\"Java installed.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "69a5925a",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] __main__: Java is already installed.\n",
      "[INFO] __main__: Java is already installed.\n"
     ]
    }
   ],
   "source": [
    "# | notest\n",
    "\n",
    "_install_java()\n",
    "assert shutil.which(\"java\")\n",
    "_install_java()\n",
    "assert shutil.which(\"java\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c3da8781",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "class VersionParser(HTMLParser):\n",
    "    \"\"\"\n",
    "    A parser class for extracting the newest version of a software from HTML data.\n",
    "\n",
    "    This class extends the HTMLParser class and provides a mechanism to extract the newest version of a software\n",
    "    from HTML data using regular expressions.\n",
    "\n",
    "    Attributes:\n",
    "        newest_version (str): The newest version of the software.\n",
    "\n",
    "    \"\"\"\n",
    "\n",
    "    def __init__(self) -> None:\n",
    "        \"\"\"\n",
    "        Initializes a VersionParser object.\n",
    "\n",
    "        The newest_version attribute is initialized to \"0.0.0\".\n",
    "\n",
    "        \"\"\"\n",
    "        HTMLParser.__init__(self)\n",
    "        self.newest_version = \"0.0.0\"\n",
    "\n",
    "    def handle_data(self, data: str) -> None:\n",
    "        \"\"\"\n",
    "        Handles the data encountered in the HTML parsing process.\n",
    "\n",
    "        This method is called by the HTMLParser base class when data is encountered within HTML tags.\n",
    "        It uses regular expressions to search for version numbers in the data and updates the newest_version\n",
    "        attribute if a higher version is found.\n",
    "\n",
    "        Args:\n",
    "            data (str): The data encountered during parsing.\n",
    "\n",
    "        \"\"\"\n",
    "        match = re.search(\"[0-9]+\\.[0-9]+\\.[0-9]+\", data)\n",
    "        if match is not None:\n",
    "            if version.parse(self.newest_version) < version.parse(match.group(0)):\n",
    "                self.newest_version = match.group(0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "378d6d41",
   "metadata": {},
   "outputs": [],
   "source": [
    "html = \"\"\"<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 3.2 Final//EN\">\n",
    "<html>\n",
    " <head>\n",
    "  <title>Index of /kafka</title>\n",
    " </head>\n",
    " <body>\n",
    "<h1>Index of /kafka</h1>\n",
    "<pre><img src=\"/icons/blank.gif\" alt=\"Icon \"> <a href=\"?C=N;O=D\">Name</a>                    <a href=\"?C=M;O=A\">Last modified</a>      <a href=\"?C=S;O=A\">Size</a>  <a href=\"?C=D;O=A\">Description</a><hr><img src=\"/icons/back.gif\" alt=\"[PARENTDIR]\"> <a href=\"/\">Parent Directory</a>                             -   \n",
    "<img src=\"/icons/folder.gif\" alt=\"[DIR]\"> <a href=\"3.4.1/\">3.4.1/</a>                  2023-06-06 03:55    -   \n",
    "<img src=\"/icons/folder.gif\" alt=\"[DIR]\"> <a href=\"3.5.0/\">3.5.0/</a>                  2023-06-13 10:29    -   \n",
    "<img src=\"/icons/unknown.gif\" alt=\"[   ]\"> <a href=\"KEYS\">KEYS</a>                    2023-06-06 03:37  113K  \n",
    "<hr></pre>\n",
    "</body></html>\"\"\"\n",
    "\n",
    "parser = VersionParser()\n",
    "parser.feed(html)\n",
    "assert parser.newest_version == \"3.5.0\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c22b5225",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "kafka_repo_url=\"https://dlcdn.apache.org/kafka\"\n",
    "\n",
    "def get_kafka_version(kafka_repo_url: str=kafka_repo_url) -> str:\n",
    "    \"\"\"\n",
    "    Retrieves the newest version of Kafka from the given Kafka repository URL.\n",
    "\n",
    "    Args:\n",
    "        kafka_repo_url: The URL of the Kafka repository. Defaults to `https://dlcdn.apache.org/kafka`.\n",
    "\n",
    "    Returns:\n",
    "        The newest version of Kafka as a string.\n",
    "\n",
    "    Raises:\n",
    "        RuntimeError: If the requests module is not installed or encounters an error during the request.\n",
    "\n",
    "    \"\"\"\n",
    "    try:\n",
    "        import requests\n",
    "    except Exception as e:\n",
    "        msg = \"Please install test version of fastkafka using 'pip install fastkafka[test]' command\"\n",
    "        logger.error(msg)\n",
    "        raise RuntimeError(msg)\n",
    "\n",
    "    parser = VersionParser()\n",
    "\n",
    "    response = requests.get(\n",
    "        kafka_repo_url,\n",
    "        timeout=60,\n",
    "    )\n",
    "    parser.feed(response.text)\n",
    "\n",
    "    return parser.newest_version"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "500f640f",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "local_path = (\n",
    "    Path(expanduser(\"~\")).parent / \"Public\"\n",
    "    if platform.system() == \"Windows\"\n",
    "    else Path(expanduser(\"~\")) / \".local\"\n",
    ") "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a015d2c2",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "def check_kafka(local_path: Path = local_path) -> bool:\n",
    "    \"\"\"Checks if Kafka is installed on the machine and exports it to PATH if necessary.\n",
    "\n",
    "    Args:\n",
    "        kafka_path: Path to the Kafka installation directory. Defaults to the global variable `kafka_path`.\n",
    "\n",
    "    Returns:\n",
    "        bool: True if Kafka is installed and exported to PATH, False otherwise.\n",
    "    \"\"\"\n",
    "    \n",
    "    kafka_fname = f\"kafka_2.13-{get_kafka_version()}\"\n",
    "    \n",
    "    kafka_path = (\n",
    "        local_path / \"kafka\"\n",
    "        if platform.system() == \"Windows\"\n",
    "        else local_path / f\"{kafka_fname}\"\n",
    "    )\n",
    "    \n",
    "    if (kafka_path / \"bin\").exists():\n",
    "        logger.info(\"Kafka is installed.\")\n",
    "        if not shutil.which(\"kafka-server-start.sh\"):\n",
    "            logger.info(\"But not exported to PATH, exporting...\")\n",
    "            kafka_binary_path = (\n",
    "                f\";{kafka_path / 'bin' / 'windows'}\"\n",
    "                if platform.system() == \"Windows\"\n",
    "                else f\":{kafka_path / 'bin'}\"\n",
    "            )\n",
    "            environ[\"PATH\"] = environ[\"PATH\"] + kafka_binary_path\n",
    "        return True\n",
    "    return False"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9d1dcfa4",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "def _install_kafka(\n",
    "    local_path: Path = local_path, kafka_repo_url: str = kafka_repo_url\n",
    ") -> None:\n",
    "    \"\"\"Checks if Kafka is installed on the machine and installs it if not.\n",
    "\n",
    "    Args:\n",
    "        local_path: Path where the Kafka installation package will be stored. Defaults to the global variable `local_path`.\n",
    "        kafka_repo_url: The URL of the Kafka repository. Defaults to `https://dlcdn.apache.org/kafka`.\n",
    "    Returns:\n",
    "       None\n",
    "\n",
    "    Raises:\n",
    "        RuntimeError: If Kafka installation fails.\n",
    "    \"\"\"\n",
    "    try:\n",
    "        import requests\n",
    "    except Exception as e:\n",
    "        msg = \"Please install test version of fastkafka using 'pip install fastkafka[test]' command\"\n",
    "        logger.error(msg)\n",
    "        raise RuntimeError(msg)\n",
    "\n",
    "    kafka_version = get_kafka_version()\n",
    "    kafka_fname = f\"kafka_2.13-{kafka_version}\"\n",
    "    kafka_url = f\"{kafka_repo_url}/{kafka_version}/{kafka_fname}.tgz\"\n",
    "    tgz_path = local_path / f\"{kafka_fname}.tgz\"\n",
    "    kafka_path = (\n",
    "        local_path / \"kafka\"\n",
    "        if platform.system() == \"Windows\"\n",
    "        else local_path / f\"{kafka_fname}\"\n",
    "    )\n",
    "\n",
    "    if not check_kafka():\n",
    "        logger.info(\"Installing Kafka...\")\n",
    "        local_path.mkdir(exist_ok=True, parents=True)\n",
    "        response = requests.get(\n",
    "            kafka_url,\n",
    "            stream=True,\n",
    "            timeout=60,\n",
    "        )\n",
    "        try:\n",
    "            total = response.raw.length_remaining // 128\n",
    "        except Exception:\n",
    "            total = None\n",
    "\n",
    "        with open(tgz_path, \"wb\") as f:\n",
    "            for data in tqdm(response.iter_content(chunk_size=128), total=total):\n",
    "                f.write(data)\n",
    "\n",
    "        with tarfile.open(tgz_path) as tar:\n",
    "            for tarinfo in tar:\n",
    "                tar.extract(tarinfo, local_path)\n",
    "\n",
    "        if platform.system() == \"Windows\":\n",
    "            rename(local_path / f\"{kafka_fname}\", kafka_path)\n",
    "\n",
    "        kafka_binary_path = (\n",
    "            f\";{kafka_path / 'bin' / 'windows'}\"\n",
    "            if platform.system() == \"Windows\"\n",
    "            else f\":{kafka_path / 'bin'}\"\n",
    "        )\n",
    "        environ[\"PATH\"] = environ[\"PATH\"] + kafka_binary_path\n",
    "        logger.info(f\"Kafka installed in {kafka_path}.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0417b349",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] __main__: Kafka is installed.\n",
      "[INFO] __main__: But not exported to PATH, exporting...\n",
      "[INFO] __main__: Kafka is installed.\n"
     ]
    }
   ],
   "source": [
    "# | notest\n",
    "\n",
    "script_extension = \"bat\" if platform.system() == \"Windows\" else \"sh\"\n",
    "\n",
    "_install_kafka()\n",
    "assert shutil.which(f\"kafka-server-start.{script_extension}\")\n",
    "_install_kafka()\n",
    "assert shutil.which(f\"kafka-server-start.{script_extension}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "eb3faa78",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "def _install_testing_deps() -> None:\n",
    "    \"\"\"Installs Java and Kafka dependencies required for testing.\n",
    "\n",
    "    Raises:\n",
    "        RuntimeError: If Java or Kafka installation fails.\n",
    "    \"\"\"\n",
    "    _install_java()\n",
    "    _install_kafka()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "21cd9601",
   "metadata": {},
   "outputs": [],
   "source": [
    "script_extension = \"bat\" if platform.system() == \"Windows\" else \"sh\"\n",
    "\n",
    "_install_testing_deps()\n",
    "assert shutil.which(\"java\")\n",
    "assert shutil.which(f\"kafka-server-start.{script_extension}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a798e8ef",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "def generate_app_src(out_path: Union[Path, str]) -> None:\n",
    "    \"\"\"Generates the source code for the test application based on a Jupyter notebook.\n",
    "\n",
    "    Args:\n",
    "        out_path: Path where the generated source code will be saved.\n",
    "\n",
    "    Raises:\n",
    "        ValueError: If the Jupyter notebook file does not exist.\n",
    "    \"\"\"\n",
    "    import nbformat\n",
    "    from nbconvert import PythonExporter\n",
    "\n",
    "    path = Path(\"099_Test_Service.ipynb\")\n",
    "    if not path.exists():\n",
    "        path = Path(\"..\") / \"099_Test_Service.ipynb\"\n",
    "    if not path.exists():\n",
    "        raise ValueError(f\"Path '{path.resolve()}' does not exists.\")\n",
    "\n",
    "    with open(path, \"r\") as f:\n",
    "        notebook = nbformat.reads(f.read(), nbformat.NO_CONVERT)\n",
    "        exporter = PythonExporter()\n",
    "        source, _ = exporter.from_notebook_node(notebook)\n",
    "\n",
    "    with open(out_path, \"w\") as f:\n",
    "        f.write(source)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ba8a287f",
   "metadata": {},
   "outputs": [],
   "source": [
    "with TemporaryDirectory() as d:\n",
    "    generate_app_src((Path(d) / \"main.py\"))\n",
    "    !ls -al {d}\n",
    "    !cat {d}/main.py | grep @kafka_app"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "23b0e599",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "@contextmanager\n",
    "def generate_app_in_tmp() -> Generator[str, None, None]:\n",
    "    \"\"\"Context manager that generates the test application source code in a temporary directory.\n",
    "\n",
    "    Yields:\n",
    "        str: Import statement for the generated test application.\n",
    "    \"\"\"\n",
    "    with TemporaryDirectory() as d:\n",
    "        src_path = Path(d) / \"main.py\"\n",
    "        generate_app_src(src_path)\n",
    "        with change_dir(d):\n",
    "            import_str = f\"{src_path.stem}:kafka_app\"\n",
    "            yield import_str"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "da3cab38",
   "metadata": {},
   "outputs": [],
   "source": [
    "with generate_app_in_tmp() as actual_import_str:\n",
    "    display(actual_import_str)\n",
    "    expected_import_str = \"main:kafka_app\"\n",
    "    assert actual_import_str == expected_import_str"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "99ce89c4",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
